package com.xiaofan.java;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * 将相邻的 keyed START 和 END 事件相匹配并计算两者的时间间隔
 * 输入数据为 Tuple2<String, String> 类型，第一个字段为 key 值，
 * 第二个字段标记 START 和 END 事件。
 */
public class StartEndDuration_C0004 extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> {

    private ValueState<Long> startTime;

    @Override
    public void open(Configuration parameters) throws Exception {
        startTime = getRuntimeContext().getState(new ValueStateDescriptor<Long>("startTime", Long.class));
    }

    @Override
    public void processElement(Tuple2<String, String> in, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        switch (in.f1) {
            case "START":
                // 开始时间
                startTime.update(ctx.timestamp());
                // 注册定时器
                ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
                break;

            case "END":
                Long sTime = startTime.value();
                if (sTime != null) {
                    out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));

                    startTime.clear();
                }
                break;

        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        startTime.clear();
    }
}
